package com.xingfei.blog.controller.message;

import com.alibaba.dubbo.common.utils.StringUtils;
import com.xingfei.blog.activemq.producer.ActiveMQProducer;
import com.xingfei.blog.dto.SecretMessageDTO;
import com.xingfei.blog.dto.SystemMessageDTO;
import com.xingfei.blog.enums.SecretMessageTypeEnum;
import com.xingfei.blog.service.WebUserService;
import com.xingfei.blog.vo.UserVO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * Created by yhang on 2017/5/2.
 */

@Component
@ServerEndpoint(value = "/web",configurator = GetHttpSessionConfigurator.class)
public class WebSocket extends SpringBootBeanAutowiringSupport {
    
    private Long userId;

    //concurrent包的线程安全Set，用来存放每个客户端对应的MyWebSocket对象。若要实现服务端与单一客户端通信的话，可以使用Map来存放，其中Key可以为用户标识
    private static final Set<WebSocket> connections = new CopyOnWriteArraySet<>();

    private String nickname;

    private Session session;

    @Autowired
    private ActiveMQProducer activeMQProducer;

    @Autowired
    private WebUserService webUserService;

    /**
     * 创建连接时间调用的方法
     * @param session
     */
    @OnOpen
    public void start(Session session,EndpointConfig config) {
        String token= (String) config.getUserProperties().get(String.class.getName());
        if (StringUtils.isNotEmpty(token)) {
            UserVO userVO = webUserService.getUser4Redis(token);
            userId = userVO.getId();
            nickname=userVO.getPhone();
        }
        this.session = session;
        connections.add(this);
        String message = String.format("* %s %s", nickname, "已上线");
        //上线通知
        //broadcast(message);
        /*try {
            //系统问候语
            SendHello(this.nickname);
            //返回在线用户
            onlineList();
        } catch (IOException e) {
            e.printStackTrace();
        }*/
    }

    /**
     * 私信调用方法
     * @param message
     */
    @OnMessage
    public void incoming(String message) {
        SecretMessageDTO messageDTO = new SecretMessageDTO();
        boolean flag = false;
        if(message.contains("to")){
            String receiveId = message.substring(message.indexOf("to")+2);
            String[] arr = message.split("to");
            //引入异步消息队列存储消息
            //判断指定用户是否离线
            Iterator<WebSocket> it = connections.iterator();
            while (it.hasNext()) {
                WebSocket websocket = it.next();
                if((websocket.userId+"").equalsIgnoreCase(receiveId)){
                    flag = true;
                }
            }
            if(flag){
                //在线直接发送  点对点发送
                broadcastOneToOne(message,nickname);
                messageDTO.setCreateTime(new Date());
                messageDTO.setReceiveId(Long.valueOf(receiveId));
                messageDTO.setMessageContent(arr[0]);
                messageDTO.setSendId(userId);
                messageDTO.setIsRead(1+"");
                activeMQProducer.sendQueueMessage(SecretMessageTypeEnum.SECRET_MESSAGE.toString(),messageDTO);
                //producer.sendTopic(messageDTO);
            }else{
                //离线不发送,直接存储
                messageDTO.setCreateTime(new Date());
                messageDTO.setReceiveId(Long.valueOf(receiveId));
                messageDTO.setMessageContent(arr[0]);
                messageDTO.setSendId(userId);
                messageDTO.setIsRead(0+"");
                activeMQProducer.sendQueueMessage(SecretMessageTypeEnum.SECRET_MESSAGE.toString(),messageDTO);
            }
        }else{
            //群发 功能无用
            broadcast(message);
        }
    }

    /**
     * 系统消息推送调用
     * @param message
     */
    public void topicWS(String message) {
        SystemMessageDTO messageDTO = new SystemMessageDTO();
        boolean flag = false;
        //判断是否包含to代码可以去掉 需要优化
        if(message.contains("to")){
            String receiveId = message.substring(message.indexOf("to")+2);
            String[] arr = message.split("to");
            Iterator<WebSocket> it = connections.iterator();
            while (it.hasNext()) {
                WebSocket websocket = it.next();
                if((websocket.userId+"").equalsIgnoreCase(receiveId)){
                    flag = true;
                }
            }
            if(flag){
                System.out.println("废弃功能");
            }else{
                System.out.println("废弃功能");
            }
        }else{
            //群发
            broadcast(message);
            //在线直接发送  点对点发送
            broadcastOneToOne(message,nickname);
            messageDTO.setSendTime(new Date());
            messageDTO.setSystemTopic("系统消息");
            messageDTO.setSystemContent(message);
            activeMQProducer.sendTopic(messageDTO);
        }
    }

    /**
     * 关注消息推送调用
     */
    public void attentionWS(String message) {
        boolean flag = false;
        if(message.contains("to")){
            String receiveId = message.substring(message.indexOf("to")+2);
            String[] arr = message.split("to");
            //引入异步消息队列存储消息
            //判断指定用户是否离线
            Iterator<WebSocket> it = connections.iterator();
            while (it.hasNext()) {
                WebSocket websocket = it.next();
                if((websocket.userId+"").equalsIgnoreCase(receiveId)){
                    flag = true;
                }
            }
            if(flag){
                //在线直接发送  点对点发送
                broadcastOneToOne(message,nickname);
            }
        }
    }
    /**
     * 链接关闭时调用方法
     */
    @OnClose
    public void end() {
        connections.remove(this);
        String message = String.format("* %s %s", nickname, "已离线");
        broadcast(message);
    }

    /**
     * 发生错误是调用方法
     * @param t
     * @throws Throwable
     */
    @OnError
    public void onError(Throwable t) throws Throwable {
        System.out.println("错误: " + t.toString());
    }
    /**
     * 消息广播
     * 通过connections，对所有其他用户推送信息的方法
     * @param msg
     */
    private static void broadcast(String msg) {
        for (WebSocket client : connections) {
            try {
                synchronized (client) {
                    client.session.getBasicRemote().sendText(msg);
                }
            } catch (IOException e) {
                System.out.println("错误:向客户端发送消息失败");
                connections.remove(client);
                try {
                    client.session.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
                String message = String.format("* %s %s", client.nickname,"已离线");
                broadcast(message);
            }
        }
    }
    /**
     * 点对点发送消息
     * 通过connections，对所有其他用户推送信息的方法
     * @param msg
     */
    private static void broadcastOneToOne(String msg, String nickName) {
        //[* 用户0 你好, 用户1]
        String[] arr = msg.split("to");
        for (WebSocket client : connections) {
            try {
                //nickname 用户0
                if(arr.length>1){
                    if(StringUtils.isNotEmpty(nickName)){
                        if(arr[1].equals(client.nickname) || nickName.equals(client.nickname)){
                            synchronized (client) {
                                client.session.getBasicRemote().sendText(arr[0]);
                            }
                        }
                    }else {
                        synchronized (client) {
                            client.session.getBasicRemote().sendText(arr[0]);
                        }
                    }

                }
            } catch (IOException e) {
                System.out.println("错误:向客户端发送消息失败");
                connections.remove(client);
                try {
                    client.session.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
                String message = String.format("* %s %s", client.nickname,"已离线");
                broadcast(message);
            }
        }
    }
    //系统问候语
    private static void SendHello(String nickName) throws IOException{
        String m = String.format("* %s %s", nickName, "你好");
        for (WebSocket client : connections) {
            if(client.nickname.equals(nickName)){
                client.session.getBasicRemote().sendText(m);
            }
        }
    }
    //在线用户
    private static void onlineList() throws IOException{
        String online = "";
        for (WebSocket client : connections) {
            if(online.equals("")){
                online = client.nickname;
            }else{
                online += ","+client.nickname;
            }
        }
        String m = String.format("* %s %s", "当前在线用户", online);
        for (WebSocket client : connections) {
            client.session.getBasicRemote().sendText(m);
        }
    }
}
